use crate::error::{internal, StreamError};
use crate::stream_router::{FromRedisStream, RedisMap};
use anyhow::anyhow;
use appflowy_proto::Rid;
use bytes::Bytes;
use collab::core::awareness::AwarenessUpdate;
use collab::core::origin::CollabOrigin;
use collab::preclude::updates::decoder::Decode;
use collab_entity::CollabType;
use redis::streams::StreamId;
use redis::{cmd, Cmd, FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::str::FromStr;
use uuid::Uuid;

/// The [MessageId] generated by XADD has two parts: a timestamp and a sequence number, separated by
/// a hyphen (-). The timestamp is based on the server's time when the message is added, and the
/// sequence number is used to differentiate messages added at the same millisecond.
///
///  If multiple messages are added within the same millisecond, Redis increments the sequence number
/// for each subsequent message
///
/// An example message ID might look like this: 1631020452097-0. In this example, 1631020452097 is
/// the timestamp in milliseconds, and 0 is the sequence number.
#[derive(Debug, Copy, Clone, Default, Ord, PartialOrd, Eq, PartialEq)]
pub struct MessageId {
  pub timestamp_ms: u64,
  pub sequence_number: u16,
}

impl MessageId {
  pub fn new(timestamp_ms: u64, sequence_number: u16) -> Self {
    MessageId {
      timestamp_ms,
      sequence_number,
    }
  }
}

impl Display for MessageId {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    write!(f, "{}-{}", self.timestamp_ms, self.sequence_number)
  }
}

impl From<Rid> for MessageId {
  fn from(rid: Rid) -> Self {
    MessageId {
      timestamp_ms: rid.timestamp,
      sequence_number: rid.seq_no,
    }
  }
}

impl From<MessageId> for Rid {
  fn from(val: MessageId) -> Self {
    Rid {
      timestamp: val.timestamp_ms,
      seq_no: val.sequence_number,
    }
  }
}

impl TryFrom<&[u8]> for MessageId {
  type Error = StreamError;

  fn try_from(s: &[u8]) -> Result<Self, Self::Error> {
    let s = std::str::from_utf8(s)?;
    Self::try_from(s)
  }
}

impl TryFrom<&str> for MessageId {
  type Error = StreamError;

  fn try_from(s: &str) -> Result<Self, Self::Error> {
    let parts: Vec<_> = s.splitn(2, '-').collect();

    if parts.len() != 2 {
      return Err(StreamError::InvalidFormat);
    }

    // Directly parse without intermediate assignment.
    let timestamp_ms = u64::from_str(parts[0])?;
    let sequence_number = u16::from_str(parts[1])?;

    Ok(MessageId {
      timestamp_ms,
      sequence_number,
    })
  }
}

impl TryFrom<String> for MessageId {
  type Error = StreamError;

  fn try_from(s: String) -> Result<Self, Self::Error> {
    Self::try_from(s.as_str())
  }
}

impl FromRedisValue for MessageId {
  fn from_redis_value(v: &Value) -> RedisResult<Self> {
    match v {
      Value::BulkString(stream_key) => MessageId::try_from(stream_key.as_slice()).map_err(|_| {
        RedisError::from((
          redis::ErrorKind::TypeError,
          "invalid stream key",
          format!("{:?}", stream_key),
        ))
      }),
      _ => Err(internal("expecting Value::Data")),
    }
  }
}

impl ToRedisArgs for MessageId {
  fn write_redis_args<W>(&self, out: &mut W)
  where
    W: ?Sized + RedisWrite,
  {
    out.write_arg_fmt(self)
  }
}

/// A message in the Redis stream. It's the same as [StreamBinary] but with additional metadata.
#[derive(Debug, Clone)]
pub struct StreamMessage {
  pub data: Bytes,
  /// only applicable when reading from redis
  pub id: MessageId,
}

impl FromRedisValue for StreamMessage {
  // Optimized parsing function
  fn from_redis_value(v: &Value) -> RedisResult<Self> {
    let bulk = bulk_from_redis_value(v)?;
    if bulk.len() != 2 {
      return Err(RedisError::from((
        redis::ErrorKind::TypeError,
        "Invalid length",
        format!(
          "Expected length of 2 for the outer bulk value, but got:{}",
          bulk.len()
        ),
      )));
    }

    let id = MessageId::from_redis_value(&bulk[0])?;
    let fields = bulk_from_redis_value(&bulk[1])?;
    if fields.len() != 2 {
      return Err(RedisError::from((
        redis::ErrorKind::TypeError,
        "Invalid length",
        format!(
          "Expected length of 2 for the bulk value, but got {}",
          fields.len()
        ),
      )));
    }

    verify_field(&fields[0], "data")?;
    let raw_data = Vec::<u8>::from_redis_value(&fields[1])?;

    Ok(StreamMessage {
      data: Bytes::from(raw_data),
      id,
    })
  }
}

impl TryFrom<StreamId> for StreamMessage {
  type Error = StreamError;

  fn try_from(value: StreamId) -> Result<Self, Self::Error> {
    let id = MessageId::try_from(value.id.as_str())?;
    let data = value
      .get("data")
      .ok_or(StreamError::UnexpectedValue("data".to_string()))?;
    Ok(Self { data, id })
  }
}

#[derive(Debug)]
pub struct StreamBinary(pub Vec<u8>);

impl From<StreamMessage> for StreamBinary {
  fn from(m: StreamMessage) -> Self {
    Self(m.data.to_vec())
  }
}

impl Deref for StreamBinary {
  type Target = Vec<u8>;

  fn deref(&self) -> &Self::Target {
    &self.0
  }
}

impl StreamBinary {
  pub fn into_tuple_array(self) -> [(&'static str, Vec<u8>); 1] {
    static DATA: &str = "data";
    [(DATA, self.0)]
  }
}

impl TryFrom<Vec<u8>> for StreamBinary {
  type Error = StreamError;

  fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
    Ok(Self(value))
  }
}

impl TryFrom<&[u8]> for StreamBinary {
  type Error = StreamError;

  fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
    Ok(Self(value.to_vec()))
  }
}

fn verify_field(field: &Value, expected: &str) -> RedisResult<()> {
  let field_str = String::from_redis_value(field)?;
  if field_str != expected {
    return Err(RedisError::from((
      redis::ErrorKind::TypeError,
      "Invalid field",
      format!("Expected '{}', found '{}'", expected, field_str),
    )));
  }
  Ok(())
}

pub struct RedisString(String);
impl FromRedisValue for RedisString {
  fn from_redis_value(v: &Value) -> RedisResult<Self> {
    match v {
      Value::BulkString(bytes) => Ok(RedisString(String::from_utf8(bytes.to_vec())?)),
      Value::SimpleString(str) => Ok(RedisString(str.clone())),
      _ => Err(internal("expecting Value::Data")),
    }
  }
}

impl Display for RedisString {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    write!(f, "{}", self.0.clone())
  }
}

fn bulk_from_redis_value(v: &Value) -> Result<&Vec<Value>, RedisError> {
  match v {
    Value::Array(b) => Ok(b),
    Value::Set(b) => Ok(b),
    _ => Err(internal("expecting Value::Bulk")),
  }
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum CollabControlEvent {
  Open {
    workspace_id: String,
    object_id: String,
    collab_type: CollabType,
    doc_state: Vec<u8>,
  },
  Close {
    object_id: String,
  },
}

impl Display for CollabControlEvent {
  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
    match self {
      CollabControlEvent::Open {
        workspace_id: _,
        object_id,
        collab_type,
        doc_state: _,
      } => f.write_fmt(format_args!(
        "Open collab: object_id:{}|collab_type:{:?}",
        object_id, collab_type,
      )),
      CollabControlEvent::Close { object_id } => {
        f.write_fmt(format_args!("Close collab: object_id:{}", object_id))
      },
    }
  }
}

impl CollabControlEvent {
  pub fn encode(&self) -> Result<Vec<u8>, serde_json::Error> {
    serde_json::to_vec(self)
  }

  pub fn decode(data: &[u8]) -> Result<Self, serde_json::Error> {
    serde_json::from_slice(data)
  }
}

impl TryFrom<CollabControlEvent> for StreamBinary {
  type Error = StreamError;

  fn try_from(value: CollabControlEvent) -> Result<Self, Self::Error> {
    let raw_data = value.encode()?;
    Ok(StreamBinary(raw_data))
  }
}

pub struct CollabStreamUpdate {
  pub data: Vec<u8>, // yrs::Update::encode_v1
  pub sender: CollabOrigin,
  pub flags: UpdateFlags,
}

impl CollabStreamUpdate {
  pub fn new<B, F>(data: B, sender: CollabOrigin, flags: F) -> Self
  where
    B: Into<Vec<u8>>,
    F: Into<UpdateFlags>,
  {
    CollabStreamUpdate {
      data: data.into(),
      sender,
      flags: flags.into(),
    }
  }

  pub fn into_update(self) -> Result<collab::preclude::Update, StreamError> {
    let bytes = if self.flags.is_compressed() {
      zstd::decode_all(std::io::Cursor::new(self.data))?
    } else {
      self.data
    };
    let update = if self.flags.is_v1_encoded() {
      collab::preclude::Update::decode_v1(&bytes)?
    } else {
      collab::preclude::Update::decode_v2(&bytes)?
    };
    Ok(update)
  }
}

impl From<UpdateStreamMessage> for CollabStreamUpdate {
  fn from(value: UpdateStreamMessage) -> Self {
    CollabStreamUpdate {
      data: value.update.to_vec(),
      sender: value.sender,
      flags: value.update_flags.into(),
    }
  }
}

impl<'a> TryFrom<&'a HashMap<String, redis::Value>> for CollabStreamUpdate {
  type Error = StreamError;

  fn try_from(fields: &'a HashMap<String, Value>) -> Result<Self, Self::Error> {
    let sender = match fields.get("sender") {
      None => CollabOrigin::Empty,
      Some(sender) => {
        let raw_origin = String::from_redis_value(sender)?;
        CollabOrigin::from_str(&raw_origin)
          .map_err(|e| StreamError::UnexpectedValue(e.to_string()))?
      },
    };
    let flags = match fields.get("flags") {
      None => UpdateFlags::default(),
      Some(flags) => u8::from_redis_value(flags).unwrap_or(0).into(),
    };
    let data_raw = fields
      .get("data")
      .ok_or_else(|| internal("expecting field `data`"))?;
    let data: Vec<u8> = FromRedisValue::from_redis_value(data_raw)?;
    Ok(CollabStreamUpdate {
      data,
      sender,
      flags,
    })
  }
}

impl FromRedisStream for (MessageId, CollabStreamUpdate) {
  type Error = StreamError;

  fn from_redis_stream(msg_id: &str, fields: &RedisMap) -> Result<Self, Self::Error>
  where
    Self: Sized,
  {
    let message_id = MessageId::try_from(msg_id)?;
    let stream_update = CollabStreamUpdate::try_from(fields)?;
    Ok((message_id, stream_update))
  }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AwarenessStreamUpdate {
  pub data: AwarenessUpdate,
  pub sender: CollabOrigin,
}

#[repr(transparent)]
#[derive(Copy, Clone, Eq, PartialEq, Default)]
pub struct UpdateFlags(u8);

impl From<appflowy_proto::UpdateFlags> for UpdateFlags {
  fn from(flags: appflowy_proto::UpdateFlags) -> Self {
    match flags {
      appflowy_proto::UpdateFlags::Lib0v1 => UpdateFlags(0),
      appflowy_proto::UpdateFlags::Lib0v2 => UpdateFlags(Self::IS_V2_ENCODED),
    }
  }
}

impl UpdateFlags {
  /// Flag bit to mark if update is encoded using [EncoderV2] (if set) or [EncoderV1] (if clear).
  pub const IS_V2_ENCODED: u8 = 0b0000_0001;
  /// Flag bit to mark if update is compressed.
  pub const IS_COMPRESSED: u8 = 0b0000_0010;

  #[inline]
  pub fn is_v2_encoded(&self) -> bool {
    self.0 & Self::IS_V2_ENCODED != 0
  }

  #[inline]
  pub fn is_v1_encoded(&self) -> bool {
    !self.is_v2_encoded()
  }

  #[inline]
  pub fn is_compressed(&self) -> bool {
    self.0 & Self::IS_COMPRESSED != 0
  }
}

impl ToRedisArgs for UpdateFlags {
  #[inline]
  fn write_redis_args<W>(&self, out: &mut W)
  where
    W: ?Sized + RedisWrite,
  {
    self.0.write_redis_args(out)
  }
}

impl From<u8> for UpdateFlags {
  #[inline]
  fn from(value: u8) -> Self {
    UpdateFlags(value)
  }
}

impl Display for UpdateFlags {
  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
    if !self.is_v2_encoded() {
      write!(f, ".v1")?;
    } else {
      write!(f, ".v2")?;
    }

    if self.is_compressed() {
      write!(f, ".zstd")?;
    }

    Ok(())
  }
}

#[derive(Debug, PartialEq)]
pub struct UpdateStreamMessage {
  pub last_message_id: Rid,
  pub sender: CollabOrigin,
  pub object_id: Uuid,
  pub collab_type: CollabType,
  pub update_flags: appflowy_proto::UpdateFlags,
  pub update: Bytes,
}

impl UpdateStreamMessage {
  pub fn stream_key(workspace_id: &Uuid) -> String {
    format!("af:u:{}", workspace_id)
  }

  pub fn prepare_command(
    stream_key: &str,
    object_id: &Uuid,
    collab_type: CollabType,
    sender: &CollabOrigin,
    update: Vec<u8>,
    flag: UpdateFlags,
  ) -> Cmd {
    let mut cmd = cmd("XADD");
    cmd
      .arg(stream_key)
      .arg("*")
      .arg("oid")
      .arg(object_id)
      .arg("ct")
      .arg(collab_type as i32)
      .arg("sender")
      .arg(sender.to_string())
      .arg("data")
      .arg(update.to_vec())
      .arg("flags")
      .arg(flag);
    cmd
  }
}

impl FromRedisStream for UpdateStreamMessage {
  type Error = anyhow::Error;
  fn from_redis_stream(msg_id: &str, fields: &RedisMap) -> Result<Self, Self::Error> {
    let last_message_id = Rid::from_str(msg_id).map_err(|err| anyhow!("{}", err))?;
    let object_id = fields
      .get("oid")
      .ok_or_else(|| anyhow!("expecting field `oid`"))?;
    let object_id = Uuid::from_redis_value(object_id).map_err(|err| anyhow!("{}", err))?;
    let collab_type = fields
      .get("ct")
      .ok_or_else(|| anyhow!("expecting field `ct`"))?;
    let collab_type = CollabType::from(i32::from_redis_value(collab_type)?);
    let sender = fields
      .get("sender")
      .ok_or_else(|| anyhow!("expecting field `sender`"))?;
    let sender = CollabOrigin::from_str(&String::from_redis_value(sender)?)?;
    let update_flags = match fields.get("flags") {
      None => appflowy_proto::UpdateFlags::default(),
      Some(flags) => u8::from_redis_value(flags).unwrap_or(0).try_into()?,
    };
    let update = fields
      .get("data")
      .ok_or_else(|| anyhow!("expecting field `data`"))?;
    let update: Bytes = FromRedisValue::from_redis_value(update)?;
    Ok(UpdateStreamMessage {
      last_message_id,
      sender,
      object_id,
      collab_type,
      update_flags,
      update,
    })
  }
}
